package com.xuzhiguang.light.gateway.core.filter;

import com.xuzhiguang.light.gateway.common.response.AbstractHttpGatewayResponse;
import com.xuzhiguang.light.gateway.common.response.GatewayResponse;
import com.xuzhiguang.light.gateway.core.context.GatewayContext;
import com.xuzhiguang.light.gateway.core.context.ModifiedGatewayRequestUtils;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.Request;
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.Response;

import java.net.URI;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/**
 * @author xuzhiguang
 */
public class AsyncHttpRoutingGatewayFilter extends AbstractHttpRoutingGatewayFilter<Request, Response>{

    private final AsyncHttpClient httpClient;

    public AsyncHttpRoutingGatewayFilter(AsyncHttpClient httpClient) {
        this.httpClient = httpClient;
    }

    @Override
    protected Request toRoutingRequest(GatewayContext context) {

        URI uri = context.getRoute().getUri();
        uri = uri.resolve(ModifiedGatewayRequestUtils.getURI(context));

        RequestBuilder builder = new RequestBuilder(ModifiedGatewayRequestUtils.getMethod(context))
                .setUrl(uri.toString())
                .setBody(ModifiedGatewayRequestUtils.getBody(context));

        ModifiedGatewayRequestUtils.getHeaders(context).forEach(builder::addHeader);

        return builder.build();
    }

    @Override
    protected GatewayResponse toGatewayResponse(Response response) {
        return new AsyncGatewayResponse(response, System.currentTimeMillis());
    }

    @Override
    protected void doRequest(GatewayContext context, Request request, Consumer<ResponseWrapper<Response>> consumer) {
        CompletableFuture<Response> future = httpClient.executeRequest(request).toCompletableFuture();
        future.whenCompleteAsync((response, throwable)
                -> consumer.accept(new ResponseWrapper<>(response, throwable)));
    }


    private static class AsyncGatewayResponse extends AbstractHttpGatewayResponse {

        private Map<String, Object> headers;

        private final Response response;

        private final long responseTime;

        private AsyncGatewayResponse(Response response, long responseTime) {
            this.response = response;
            this.responseTime = responseTime;
        }

        @Override
        public long getResponseTime() {
            return responseTime;
        }

        @Override
        public Map<String, Object> getHeaders() {

            if (this.headers == null) {
                this.headers = new HashMap<>(response.getHeaders().size());

                Iterator<Map.Entry<CharSequence, CharSequence>> iterator =  response.getHeaders().iteratorCharSequence();
                while (iterator.hasNext()) {
                    Map.Entry<CharSequence, CharSequence> entry = iterator.next();
                    this.headers.put(String.valueOf(entry.getKey()), entry.getValue());
                }
            }
            return this.headers;
        }

        @Override
        public byte[] getBody() {
            return response.getResponseBodyAsBytes();
        }

        @Override
        public int getStatusCode() {
            return response.getStatusCode();
        }
    }
}
